Write JSONs in Avro snappy files

Spark-submit needs option

--packages com.databricks:spark-avro_2.10:2.0.1

In [1]:
#!/usr/bin/env python
"""
File       : AvroSnappyIO.py
Author     : Luca Menichetti <luca.menichetti AT cern dot ch>
Description: Converts a set of JSONs into Avro files with Snappy conversion,
             Spark SQLContext is needed (or HiveContext)
"""

import json

class AvroSnappyIO(object):
    def __init__(self, sparkContext, sparkSQLContext):
        self.sqlc = sparkSQLContext
        self.sc = sparkContext

    def file_write(self, fname, data, repartitionNumber=None, write_mode="append"):
        """
        fname: output folder name, usually a HDFS path
        data: an array of JSONs
        repartitionNumer: [optional] the number of partitions used to write the output file
        """
        if not self.sqlc or not self.sc:
            raise Exception("Both Spark Context and SQLContext must be available")
        jsonDocsDF = self.sqlc.jsonRDD(self.sc.parallelize([json.dumps(j) for j in data]))
        sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")
        if repartitionNumber:
            jsonDocsDF.repartition(repartitionNumber).save(fname, "com.databricks.spark.avro", mode=write_mode)
        else:
            jsonDocsDF.save(fname, "com.databricks.spark.avro", mode=write_mode)

In [2]:
avro_snappy_IO = AvroSnappyIO(sc,sqlContext)

In [3]:
fwjr_array = [
    {"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"},
    {"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"},
    {"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"}
]

In [5]:
avro_snappy_IO.file_write("test-json2avro-snappy",fwjr_array, 1)


/root/spark-current/python/pyspark/sql/dataframe.py:167: UserWarning: insertInto is deprecated. Use write.save() instead.
  warnings.warn("insertInto is deprecated. Use write.save() instead.")

In [6]:
%%bash
hadoop fs -ls test-json2avro-snappy


Found 2 items
-rw-r--r--   3 lmeniche supergroup          0 2016-04-19 11:11 test-json2avro-snappy/_SUCCESS
-rw-r--r--   3 lmeniche supergroup       1709 2016-04-19 11:11 test-json2avro-snappy/part-r-00000-3c0f517c-86d9-4f40-aadd-4e9da68a2b0d.avro

In [7]:
fwjr_another_array = [
    {"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"},
    {"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"},
    {"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"}
]

In [8]:
avro_snappy_IO.file_write("test-json2avro-snappy",fwjr_another_array, 1)

In [9]:
%%bash
hadoop fs -ls test-json2avro-snappy


Found 3 items
-rw-r--r--   3 lmeniche supergroup          0 2016-04-19 11:11 test-json2avro-snappy/_SUCCESS
-rw-r--r--   3 lmeniche supergroup       1709 2016-04-19 11:11 test-json2avro-snappy/part-r-00000-3c0f517c-86d9-4f40-aadd-4e9da68a2b0d.avro
-rw-r--r--   3 lmeniche supergroup       1709 2016-04-19 11:11 test-json2avro-snappy/part-r-00000-db206d0c-aada-4491-b8d6-57835c97ddb6.avro

In [10]:
import json

In [11]:
rec = json.load(open('/afs/cern.ch/user/l/lmeniche/work-ws-link/tmp/fwjr_prod.json'))
fwjr_array = [rec, rec]
avro_snappy_IO.file_write("test-json2avro-snappy",fwjr_array, 1)

Write to Local FS


In [13]:
avro_snappy_IO.file_write("file:///root/test-local-json2avro-snappy",fwjr_array, 1)

In [14]:
%%bash
ls /root/test-local-json2avro-snappy/


part-r-00000-f5eb56b0-3d32-4e3d-8f3c-a6d969fd7114.avro
_SUCCESS

In [ ]: